package fun.lumia.flink.udtf;

import com.alibaba.fastjson.JSON;
import fun.lumia.flink.bean.LoadActUserLoop;
import fun.lumia.flink.bean.LoadActUserLoopContent;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * Created by LumiaO
 * on 2025/9/12.
 */
// 定义在SQL中调用UDTF函数后返回的数据格式
@FunctionHint(output = @DataTypeHint("ROW<dataclass String,cust_circuit_id String,data_time String,load_value Double,point String>"))
public class ExtractJsonFromMQ extends TableFunction<Row> {

    public void eval(String json_str) {
        String new_json_str = json_str.substring(1).substring(0,json_str.length()-2);
        // 将JSON字符串转换成Java对象
        LoadActUserLoop loadActUserLoop = JSON.parseObject(new_json_str, LoadActUserLoop.class);
        String dataclass = loadActUserLoop.getDataclass();
        // 循环提取data_content中的每一条数据
        for (LoadActUserLoopContent dataContent : loadActUserLoop.getDataContents()) {
            // 提取每一列数据
            String custCircuitId = dataContent.getCustCircuitId();
            String dataTime = dataContent.getDataTime();
            Double loadValue = dataContent.getLoadValue();
            String point = dataContent.getPoint();
            // 将每一列数据组合成一行数据（Row）输出
            collect(Row.of(dataclass,custCircuitId,dataTime,loadValue,point));
//            System.out.println(dataclass+','+custCircuitId+','+dataTime+','+loadValue+','+point);
        }
    }

    public static void main(String[] args) {
        String json_str = "{\"dataclass\":\"load_act_user_loop\",\"data_content\":[{\"cust_circuit_id\":\"124344656\",\"data_time\":\"2025-07-21 10:10:10\",\"load_value\":1000,\"point\":\"1\"},{\"cust_circuit_id\":\"124344656\",\"data_time\":\"2025-07-21 10:10:10\",\"load_value\":1000,\"point\":\"1\"}]}";
        new ExtractJsonFromMQ().eval(json_str);
//        String json_str_arr = "[{\"dataclass\":\"load_act_user_loop\",\"data_content\":[{\"cust_circuit_id\":\"124344656\",\"data_time\":\"2025-07-21 10:10:10\",\"load_value\":1000,\"point\":\"1\"},{\"cust_circuit_id\":\"124344656\",\"data_time\":\"2025-07-21 10:10:10\",\"load_value\":1000,\"point\":\"1\"}]}]";
//        new ExtractJsonFromMQ().eval(json_str_arr);
    }

}
